Azure Firewall/Script - Migrate Checkpoint config to Azure Firewall Policy/chkp2azfw.py (620 lines of code) (raw):
import argparse
import json
import re
import os
import sys
import copy
# https://docs.python.org/3/library/ipaddress.html
import ipaddress
# Helper functions
# Arguments
parser = argparse.ArgumentParser(description='Generate an ARM template to create a Rule Collection Group from a Checkpoint ruleset exported with the Show Package Tool (https://support.checkpoint.com/results/sk/sk120342).')
parser.add_argument('--json-index-file', dest='json_index_file', action='store',
default="./index.json",
help='Local file containing in JSON the links to the rest of the exported JSON files. The default is "./index.json"')
parser.add_argument('--policy-name', dest='policy_name', action='store',
default="azfwpolicy",
help='Name for the Azure Firewall Policy. The default is "azfwpolicy"')
parser.add_argument('--policy-sku', dest='policy_sku', action='store',
default="Standard",
help='SKU for the Azure Firewall Policy. Possible values: Standard, Premium (default: Standard)')
parser.add_argument('--do-not-create-policy', dest='dont_create_policy', action='store_true',
default=False,
help='If specified, do not include ARM code for the policy, only for the rule collection group. Use if the policy already exists.')
parser.add_argument('--rcg-name', dest='rcg_name', action='store',
default="importedFromCheckpoint",
help='Name for the Rule Collection Group to create in the Azure Firewall Policy. The default is "importedFromCheckpoint"')
parser.add_argument('--rcg-priority', dest='rcg_prio', action='store',
default="10000",
help='Priority for the Rule Collection Group to create in the Azure Firewall Policy. The default is "10000"')
parser.add_argument('--no-ip-groups', dest='use_ipgroups', action='store_false',
default=True,
help='Whether some address groups should be converted to Azure IP Groups (default: True)')
parser.add_argument('--no-app-rules', dest='use_apprules', action='store_false',
default=True,
help='Whether it will be attempted to convert network rules using HTTP/S to application rules. Note that this might be a problem if a explicit network deny exists (default: True)')
parser.add_argument('--max-ip-groups', dest='max_ipgroups', action='store', type=int, default=50,
help='Optional, maximum number of IP groups that will be created in Azure')
parser.add_argument('--rule-uid-to-name', dest='rule_id_to_name', action='store_true',
default=False,
help='Includes the UID of the Checkpoint rule in the name of the Azure rule, useful for troubleshooting (default: False)')
parser.add_argument('--remove-explicit-deny', dest='remove_explicit_deny', action='store_true',
default=False,
help='If a deny any/any is found, it will not be converted to the Azure Firewall syntax. Useful if using application rules (default: False)')
parser.add_argument('--output', dest='output', action='store',
default="none",
help='Output format. Possible values: json, none')
parser.add_argument('--pretty', dest='pretty', action='store_true',
default=False,
help='Print JSON in pretty mode (default: False)')
parser.add_argument('--log-level', dest='log_level_string', action='store',
default='warning',
help='Logging level (valid values: error/warning/info/debug/all/none. Default: warning)')
args = parser.parse_args()
# Variables
az_app_rcs = []
az_net_rcs = []
ipgroups = []
discarded_rules = []
rcg_name = args.rcg_name
rcg_prio = args.rcg_prio
rc_net_name = 'from-chkp-net'
rc_net_prio_start = "10000"
rc_app_name = 'from-chkp-app'
rc_app_prio_start = "11000"
cnt_apprules = 0
cnt_allow = 0
cnt_deny = 0
cnt_disabledrules = 0
cnt_apprules = 0
cnt_netrules_ip = 0
cnt_netrules_fqdn = 0
cnt_chkp_rules = 0
# Returns true if the string is a number
def is_number(value):
for character in value:
if character.isdigit():
return True
return False
# Returns a string formatted to be used as a name in Azure
def format_to_arm_name(name):
name = name.replace(".", "-")
name = name.replace("/", "-")
name = name.replace(" ", "_")
return name
# Returns true if the string is a UID
def is_uid(value):
if len(value) == 36 and value[8] == '-' and value[13] == '-' and value[18] == '-' and value[23] == '-':
return True
# Finds an object in a list by its UID
def find_uid(object_list, uid):
for object in object_list:
if object['uid'] == uid:
return object
return None
# Returns true if there is an IP group with the same chkp id
def is_ipgroup(ipgroup_list, uid):
for ipgroup in ipgroup_list:
if ipgroup['id'] == uid:
return True
return False
# Returns IP Group corresponding to the chkp id
def find_ipgroup(ipgroup_list, uid):
for ipgroup in ipgroup_list:
if ipgroup['id'] == uid:
return ipgroup
return None
# True if parameter is a valid FQDN according to RFCs 952, 1123
def is_fqdn(str_var):
return bool(re.match(r"(?=^.{4,253}$)(^((?!-)[a-zA-Z0-9-]{1,63}(?<!-)\.)+[a-zA-Z]{2,4}$)",str(str_var)))
# True if parameter is a valid IP address (with or without mask)
# The regex is quite simple (for example it would match 999.999.999.999/99), but we assume that the IP addresses in the original policy are valid
def is_ipv4(str_var):
return bool(re.match(r"^([0-9]{1,3}\.){3}[0-9]{1,3}($|/[0-9]{1,2}$)",str(str_var)))
# Perform some checks on the rule to add, and append it to the list of rules provided in the 2nd argument
# Some rules need to be broken down in multiple ones, so the function adds a suffix to the created rules in this case
def append_rule(rule_to_be_appended, rules_to_append_to):
if log_level >= 8:
print("DEBUG: appending to rules:", str(rule_to_be_appended), file=sys.stderr)
src_fields = ('sourceAddresses', 'sourceIpGroups', 'sourceServiceTags')
dst_fields = ('destinationAddresses', 'destinationIpGroups', 'destinationFqdns', 'destinationServiceTags')
all_fields = src_fields + dst_fields
# Count how many rules we will be splitting (to avoid unnecessary suffixes if there is only one rule)
total_rule_no = 0
for src_field in src_fields:
for dst_field in dst_fields:
if len(rule_to_be_appended[src_field]) > 0 and len(rule_to_be_appended[dst_field]) > 0:
total_rule_no += 1
# Process the rule
split_rule_counter = 0
for src_field in src_fields:
for dst_field in dst_fields:
# Only look at combinations where the src_field and dst_field are non-zero
if len(rule_to_be_appended[src_field]) > 0 and len(rule_to_be_appended[dst_field]) > 0:
# Should we split a rule that contains both IP addresses and service tags in either sourceAddresses or destinationAddresses?
temp_rule = copy.copy(rule_to_be_appended)
split_rule_counter += 1
if total_rule_no > 1:
temp_rule['name'] = temp_rule['name'] + '-' + str(split_rule_counter)
else:
temp_rule['name'] = temp_rule['name']
# Blank all the rest fields
for blank_field in all_fields:
if blank_field != src_field and blank_field != dst_field:
temp_rule [blank_field] = []
rules_to_append_to.append(temp_rule)
# The fields 'sourceServiceTags' and 'destinationServiceTags' are not supported in Azure Firewall, so we need to change them to 'sourceAddresses' and 'destinationAddresses'
if src_field == 'sourceServiceTags':
temp_rule['sourceAddresses'] = temp_rule['sourceServiceTags']
temp_rule.pop('sourceServiceTags')
if dst_field == 'destinationServiceTags':
temp_rule['destinationAddresses'] = temp_rule['destinationServiceTags']
temp_rule.pop('destinationServiceTags')
if split_rule_counter > 1:
if log_level >= 7:
print("DEBUG: Checkpoint rule {0} has been split in {1} Azure Firewall rules".format(rule_to_be_appended['name'], split_rule_counter), file=sys.stderr)
return rules_to_append_to
# Recursively finds all members of objects by their UID
def find_members(object_group_list, uid_list, member_list=[], debug=False, mode='ip'):
# if debug:
# print("DEBUG: looking for UIDs '{0}'...".format(str(uid_list)), file=sys.stderr)
# Make sure that the uid is a list
if not isinstance(uid_list, list):
uid_list = [uid_list]
# Loop through all objects
for object_group in object_group_list:
if object_group['uid'] in uid_list:
# if debug:
# print('DEBUG: found matching object', str(object_group), file=sys.stderr)
if 'members' in object_group:
if len(object_group['members']) > 0:
for member in object_group['members']:
if is_uid(member):
member_list = find_members(object_group_list, member, member_list=member_list)
else:
if debug:
print('DEBUG: object group {0} has no members.'.format(str(object_group['name'])), file=sys.stderr)
elif object_group['type'] == 'network':
member_list.append(object_group['subnet4'] + '/' + str(object_group['mask-length4']))
elif object_group['type'] == 'host':
member_list.append(object_group['ipv4-address'] + '/32')
elif object_group['type'] == 'dns-domain':
member_list.append(str(object_group['name'])[1:]) # In checkpoint syntax, fqdn starts with a dot
elif object_group['type'] == 'dynamic-object': # Service Tag "AVDServiceRanges"
if debug:
print('DEBUG: adding dynamic-object {0}'.format(object_group['name']), str(object_group), file=sys.stderr)
if object_group['name'] == 'AVDServiceRanges':
member_list.append('WindowsVirtualDesktop')
else:
if log_level >= 3:
print('ERROR: dynamic-object {0} cannot be mapped to an Azure service tag'.format(object_group['name']), file=sys.stderr)
elif object_group['type'] == 'service-tcp':
member_list.append(('tcp', object_group['port']))
elif object_group['type'] == 'service-udp':
member_list.append(('udp', object_group['port']))
elif object_group['type'] == 'service-icmp':
member_list.append(('icmp', '*'))
elif object_group['type'] == 'CpmiAnyObject':
if (mode == 'ip'):
member_list.append('*')
else:
member_list.append(('any', '*'))
elif object_group['type'] == 'RulebaseAction':
member_list.append(object_group['name'])
elif object_group['type'] in ('CpmiGatewayCluster', 'CpmiClusterMember', 'CpmiHostCkp', 'simple-cluster', 'Global'):
if debug:
print('DEBUG: ignoring object type', object_group['type'], file=sys.stderr)
else:
if debug:
print('DEBUG: unknown object type', object_group['type'], file=sys.stderr)
return list(set(member_list))
# Set log_level
if is_number(args.log_level_string):
try:
log_level = int(args.log_level_string)
except:
log_level = 4
else:
if args.log_level_string == 'error':
log_level = 3
elif args.log_level_string == 'warning':
log_level = 4
elif args.log_level_string == 'notice':
log_level = 5
elif args.log_level_string == 'info':
log_level = 6
elif args.log_level_string == 'debug' or args.log_level_string == 'all':
log_level = 7
elif args.log_level_string == 'debugplus' or args.log_level_string == 'all':
log_level = 8
elif args.log_level_string == 'none':
log_level = 0
else:
log_level = 4 # We default to 'warning'
# Get JSON index file list from the specified folder
if log_level > 7:
print ("DEBUG: Loading file {0}...".format(args.json_index_file), file=sys.stderr)
try:
with open(args.json_index_file) as f:
json_index = json.load(f)
except Exception as e:
if log_level >= 3:
print("ERROR: Error when opening JSON index file", args.json_index_file, "-", str(e), file=sys.stderr)
sys.exit(0)
# Go through the files and create the objects
access_layers = []
threat_layers = []
nat_layers = []
for package in json_index['policyPackages']:
if 'objects' in package:
if log_level >= 7:
print ("DEBUG: Objects section found, file {0}...".format(package['objects']['htmlObjectsFileName']), file=sys.stderr)
filename = package['objects']['htmlObjectsFileName']
try:
# Try to open the file with JSON extension
filename = os.path.splitext(package['objects']['htmlObjectsFileName'])[0]+'.json'
with open(filename) as f:
policy_objects = json.load(f)
if log_level >= 7:
print ("DEBUG: File {0} loaded successfully".format(filename), file=sys.stderr)
except Exception as e:
if log_level >= 4:
print("WARNING: Error when opening JSON file", filename, "-", str(e), file=sys.stderr)
pass
if 'accessLayers' in package:
for layer in package['accessLayers']:
if 'htmlFileName' in layer:
if log_level >= 7:
print ("DEBUG: Access layer found, file {0}...".format(layer['htmlFileName']), file=sys.stderr)
filename = layer['htmlFileName']
try:
# Try to open the file with JSON extension
filename = os.path.splitext(layer['htmlFileName'])[0]+'.json'
with open(filename) as f:
access_layers.append(json.load(f))
if log_level >= 7:
print ("DEBUG: File {0} loaded successfully".format(filename), file=sys.stderr)
except Exception as e:
if log_level >= 4:
print("WARNING: Error when opening JSON file for access layer", filename, "-", str(e), file=sys.stderr)
pass
if 'threatLayers' in package:
for layer in package['threatLayers']:
if 'htmlFileName' in layer:
if log_level >= 7:
print ("DEBUG: Threat layer found, file {0}...".format(layer['htmlFileName']), file=sys.stderr)
filename = layer['htmlFileName']
try:
filename = os.path.splitext(layer['htmlFileName'])[0] + '.json'
with open(filename) as f:
threat_layers.append(json.load(f))
if log_level >= 7:
print ("DEBUG: File {0} loaded successfully".format(filename), file=sys.stderr)
except Exception as e:
if log_level >= 4:
print("WARNING: Error when opening JSON file for threat layer", filename, "-", str(e), file=sys.stderr)
pass
if 'natLayer' in package:
layer = package['natLayer']
if 'htmlFileName' in layer:
if log_level >= 7:
print ("DEBUG: NAT layer found, file {0}...".format(layer['htmlFileName']), file=sys.stderr)
filename = layer['htmlFileName']
try:
# Try to open the file with JSON extension
filename = os.path.splitext(layer['htmlFileName'])[0]+'.json'
with open(filename) as f:
# nat_layer = json.load(f)
nat_layers.append(json.load(f))
if log_level >= 7:
print ("DEBUG: File {0} loaded successfully".format(filename), file=sys.stderr)
except Exception as e:
if log_level >= 4:
print("WARNING: Error when opening JSON file for NAT layer", filename, "-", str(e), file=sys.stderr)
pass
# Inspect the imported objects
# policy_object_types = []
# for policy_object in policy_objects:
# if 'type' in policy_object:
# if not policy_object['type'] in policy_object_types:
# policy_object_types.append(policy_object['type'])
# if log_level >= 7:
# print('Policy object types found:', str(policy_object_types))
# Policy object types found: ['vpn-community-star', 'RulebaseAction', 'CpmiAnyObject', 'service-group', 'group', 'Track', 'Global', 'service-tcp', 'network', 'dynamic-object', 'host', 'CpmiHostCkp', 'service-icmp', 'service-other', 'threat-profile', 'ThreatExceptionRulebase', 'service-udp', 'dns-domain', 'simple-cluster', 'CpmiClusterMember']
# Inspect the imported access layers
def inspect_access_layers(layer_list):
for layer in layer_list:
for rule in layer:
# Check rule is a dictionary and contains a type key
if isinstance(rule, dict) and 'type' in rule:
if rule['type'] == 'access-rule':
# Rule Name
rule_name = rule['name'] if len(rule['name']) <= 38 else rule['name'][:38]
# action/src/dst/svc object Members
rule_action_members_str = str(find_members(policy_objects, rule['action'], member_list=[])[0])
rule_src_members = find_members(policy_objects, rule['source'], member_list=[], mode='ip')
rule_src_members_str = str(rule_src_members) if len(str(rule_src_members)) <= 38 else str(rule_src_members)[:38]
rule_dst_members = find_members(policy_objects, rule['destination'], member_list=[], mode='ip')
rule_dst_members_str = str(rule_dst_members) if len(str(rule_dst_members)) <= 38 else str(rule_dst_members)[:38]
rule_svc_members = find_members(policy_objects, rule['service'], member_list=[], mode='svc')
rule_svc_members_str = str(rule_svc_members) if len(str(rule_svc_members)) <= 38 else str(rule_svc_members)[:38]
# For each group ID used as source or destination, create an IP group object
if len(rule_src_members) > 0:
for src in rule['source']:
if not is_ipgroup(ipgroups, src):
ipgroups.append({'id': src, 'members': rule_src_members, 'member_count': len(rule_src_members), 'name': find_uid(policy_objects, src)['name']})
if len(rule_dst_members) > 0:
for dst in rule['destination']:
if not is_ipgroup(ipgroups, dst):
ipgroups.append({'id': dst, 'members': rule_dst_members, 'member_count': len(rule_dst_members), 'name': find_uid(policy_objects, dst)['name']})
elif rule['type'] == 'nat-rule':
if log_level >= 7:
print('DEBUG: processing NAT rule', rule['rule-number'], file=sys.stderr)
elif rule['type'] == 'threat-rule':
if log_level >= 7:
print('DEBUG: processing Threat rule', rule['rule-number'], file=sys.stderr)
else:
if log_level >= 7:
print('DEBUG: ignoring rule of type', rule['type'], file=sys.stderr)
else:
print('ERROR: Rule is not a dictionary or does not contain a type key:', str(rule), file=sys.stderr)
def print_access_layer_rule(layer_list, rule_id_list, debug=False):
for layer in layer_list:
if log_level >= 7:
print('{0:<40}{1:<40}{2:<40}{3:<40}{4:<40}'.format('Name', 'Action', 'Source', 'Destination', 'Service'), file=sys.stderr)
for rule in layer:
# Check rule is a dictionary and contains a type key
if isinstance(rule, dict) and 'type' in rule:
if rule['type'] == 'access-rule' and rule['uid'] in rule_id_list:
# Rule Name
rule_name = rule['name'] if len(rule['name']) <= 38 else rule['name'][:38]
# action/src/dst/svc object Members
rule_action_members_str = str(find_members(policy_objects, rule['action'], member_list=[])[0])
rule_src_members = find_members(policy_objects, rule['source'], member_list=[], mode='ip', debug=debug)
rule_src_members_str = str(rule_src_members) if len(str(rule_src_members)) <= 38 else str(rule_src_members)[:38]
rule_dst_members = find_members(policy_objects, rule['destination'], member_list=[], mode='ip', debug=debug)
rule_dst_members_str = str(rule_dst_members) if len(str(rule_dst_members)) <= 38 else str(rule_dst_members)[:38]
rule_svc_members = find_members(policy_objects, rule['service'], member_list=[], mode='svc', debug=debug)
rule_svc_members_str = str(rule_svc_members) if len(str(rule_svc_members)) <= 38 else str(rule_svc_members)[:38]
# Print
if log_level >= 7:
print('{0:<40}{1:<40}{2:<40}{3:<40}{4:<40}'.format(rule_name, rule_action_members_str, rule_src_members_str, rule_dst_members_str, rule_svc_members_str), file=sys.stderr)
# Process the imported access layers. inspect_access_layers needs to have run first to create the list of IP groups
def process_access_layers(layer_list, ipgroups):
global cnt_netrules_ip, cnt_netrules_fqdn, cnt_chkp_rules
last_action = None
for layer in layer_list:
for rule in layer:
# Check rule is a dictionary and contains a type key
if isinstance(rule, dict) and 'type' in rule:
if rule['type'] == 'access-rule':
cnt_chkp_rules += 1
# Rule Name and action
rule_name = rule['name']
rule_action = str(find_members(policy_objects, rule['action'], member_list=[])[0])
# If there is a change from deny to allow, or from allow to deny, or if this is the first rule, we need to create a rule collection
if rule_action != last_action:
rule_collection = {
'name': rc_net_name + '-' + rule_action + '-' + str(len(az_net_rcs)),
'action': rule_action,
'rules': []
}
# Append the rule collection to the list of rule collections and set last_action to the new value
az_net_rcs.append(rule_collection)
last_action = rule_action
# action/src/dst/svc object Members
rule_src_members = find_members(policy_objects, rule['source'], member_list=[], mode='ip')
rule_dst_members = find_members(policy_objects, rule['destination'], member_list=[], mode='ip')
rule_svc_members = find_members(policy_objects, rule['service'], member_list=[], mode='svc')
# Print
if len(rule_src_members) > 0 and len(rule_dst_members) > 0 and len(rule_svc_members) > 0:
# 'sourceServiceTags' and 'destinationServiceTags' are auxiliary fields, since the service tags go actually in the 'sourceAddresses' and 'destinationAddresses' fields
# The fields will be removed in the function append_rule
new_rule = {
'name': rule['name'] + '-' + str(rule['uid']),
'ruleType': 'NetworkRule',
'sourceAddresses': [],
'sourceIpGroups': [],
'destinationAddresses': [],
'destinationFqdns': [],
'destinationIpGroups': [],
'sourceServiceTags': [],
'destinationServiceTags': []
}
if not args.rule_id_to_name:
new_rule['name'] = rule['name']
if len(rule_src_members) == 1 and is_ipgroup(ipgroups, rule_src_members[0]):
new_rule['sourceIpGroups'].append(find_ipgroup(ipgroups, rule_src_members[0]))['name']
else:
for src in rule_src_members:
if src == 'any' or src == '*' or 'any' in src or src[0] == 'any':
new_rule['sourceAddresses'] = [ '*' ]
elif is_ipv4(src):
if src not in new_rule['sourceAddresses']:
new_rule['sourceAddresses'].append(src)
# If not an IP address, it must be a service tag
elif src not in new_rule['sourceAddresses']:
if src not in new_rule['sourceServiceTags']:
new_rule['sourceServiceTags'].append(src)
if len(rule_dst_members) == 1 and is_ipgroup(ipgroups, rule_dst_members[0]):
new_rule['destinationIpGroups'].append(find_ipgroup(ipgroups, rule_dst_members[0]))['name']
else:
for dst in rule_dst_members:
if dst == 'any' or dst == '*' or 'any' in dst:
cnt_netrules_ip += 1
new_rule['destinationAddresses'] = [ '*' ]
elif is_fqdn(dst):
cnt_netrules_fqdn += 1
if dst not in new_rule['destinationFqdns']:
cnt_netrules_fqdn += 1
new_rule['destinationFqdns'].append(dst)
elif is_ipv4(dst):
if dst not in new_rule['destinationAddresses']:
cnt_netrules_ip += 1
new_rule['destinationAddresses'].append(dst)
# If not an IP address or a domain name, it must be a service tag
else:
if dst not in new_rule['destinationServiceTags']:
new_rule['destinationServiceTags'].append(dst)
# Services are in an array of 2-tuples (protocol, port)
if 'any' in rule_svc_members:
new_rule['ipProtocols'] = ['Any']
new_rule['destinationPorts'] = [ '*' ]
else:
new_rule['ipProtocols'] = []
new_rule['destinationPorts'] = []
for svc in rule_svc_members:
protocol = svc[0]
port = svc[1]
if protocol == 'tcp' or protocol == 'udp':
if protocol not in new_rule['ipProtocols']:
new_rule['ipProtocols'].append(protocol)
if port not in new_rule['destinationPorts']:
# Checkpoint accepts the syntax >1024, but Azure does not
if port[0] == '>':
new_rule['destinationPorts'].append(str(int(port[1:]) + 1) + '-65535')
else:
new_rule['destinationPorts'].append(port)
elif protocol == 'icmp':
if protocol not in new_rule['ipProtocols']:
new_rule['ipProtocols'].append(protocol)
new_rule['destinationPorts'] = [ '*' ]
elif protocol == 'any':
new_rule['ipProtocols'] = ['Any']
new_rule['destinationPorts'] = [ '*' ]
else:
print('ERROR: Unknown service protocol', protocol, 'in rule', rule_name, file=sys.stderr)
# Add new rule to the latest rule collection (the one we are working on)
if args.remove_explicit_deny and rule_action == 'Drop' and new_rule['sourceAddresses'] == [ '*' ] and new_rule['destinationAddresses'] == [ '*' ] and new_rule['destinationPorts'] == [ '*' ] and new_rule['ipProtocols'] == ['Any']:
discarded_rules.append(rule['uid'])
if log_level >= 6:
print('INFO: Skipping rule "{0}" as it is an explicit catch-all deny rule'.format(rule_name), file=sys.stderr)
else:
az_net_rcs[-1]['rules'] = append_rule(new_rule, az_net_rcs[-1]['rules'])
# If one of the objects was empty, add to the discarded rules
else:
discarded_rules.append(rule['uid'])
# Inspect the imported NAT layers
def inspect_nat_layers(layer_list):
for layer in layer_list:
print('{0:<5}{1:<20}{2:<20}{3:<20}{4:<20}{5:<20}{6:<20}'.format('ID', 'Original Src', 'Translated Src', 'Original Dst', 'Translated Dst', 'Original Svc', 'Translated Svc'), file=sys.stderr)
for rule in layer:
# Check rule is a dictionary and contains a type key
if isinstance(rule, dict) and 'type' in rule:
if rule['type'] == 'nat-rule':
if log_level >= 7:
# Rule ID
rule_id = rule['rule-number']
# src/dst/svc object Members
rule_osrc_members = find_members(policy_objects, rule['original-source'], member_list=[], mode='ip')
rule_osrc_members_str = str(rule_osrc_members) if len(str(rule_osrc_members)) <= 38 else str(rule_osrc_members)[:38]
rule_tsrc_members = find_members(policy_objects, rule['translated-source'], member_list=[], mode='ip')
rule_tsrc_members_str = str(rule_tsrc_members) if len(str(rule_tsrc_members)) <= 38 else str(rule_tsrc_members)[:38]
rule_odst_members = find_members(policy_objects, rule['original-destination'], member_list=[], mode='ip')
rule_odst_members_str = str(rule_odst_members) if len(str(rule_odst_members)) <= 38 else str(rule_odst_members)[:38]
rule_tdst_members = find_members(policy_objects, rule['translated-destination'], member_list=[], mode='ip')
rule_tdst_members_str = str(rule_tdst_members) if len(str(rule_tdst_members)) <= 38 else str(rule_tdst_members)[:38]
rule_osvc_members = find_members(policy_objects, rule['original-service'], member_list=[], mode='svc')
rule_osvc_members_str = str(rule_osvc_members) if len(str(rule_osvc_members)) <= 38 else str(rule_osvc_members)[:38]
rule_tsvc_members = find_members(policy_objects, rule['translated-service'], member_list=[], mode='svc')
rule_tsvc_members_str = str(rule_tsvc_members) if len(str(rule_tsvc_members)) <= 38 else str(rule_tsvc_members)[:38]
# Print
print('{0:<5}{1:<20}{2:<20}{3:<20}{4:<20}{5:<20}{6:<20}'.format(rule_id, rule_osrc_members_str, rule_tsrc_members_str, rule_odst_members_str, rule_tdst_members_str, rule_osvc_members_str, rule_tsvc_members_str), file=sys.stderr)
else:
if log_level >= 7:
print('DEBUG: ignoring rule of type', rule['type'])
else:
print('ERROR: Rule is not a dictionary or does not contain a type key:', str(rule))
if log_level >= 7:
print('DEBUG: Access layers found:', file=sys.stderr)
inspect_access_layers(access_layers)
# Other types of layers (not required)
# if log_level >= 7:
# print('DEBUG: Threat layers found:')
# inspect_access_layers(threat_layers)
# if log_level >= 7:
# print('DEBUG: NAT layer found:')
# inspect_nat_layers(nat_layers)
# Remove ipgroups that contain FQDNs
ipgroups_copy = ipgroups.copy()
for ipgroup in ipgroups_copy:
for x in ipgroup['members']:
if is_fqdn(x):
if log_level >= 7:
print('DEBUG: Removing IP group', ipgroup['name'], 'because it contains FQDN', x, '(IP Groups can only contain IP addresses)', file=sys.stderr)
ipgroups.remove(ipgroup)
break
if log_level >= 6:
print('INFO: {0} out of {1} IP Groups remain after removing FQDNs'.format(len(ipgroups), len(ipgroups_copy)), file=sys.stderr)
# Show ipgroups
ipgroups = sorted(ipgroups, key=lambda d: d['member_count'], reverse=True)
if log_level >= 6:
print('INFO: {0} IP groups found, capping them to the top {1}'.format(len(ipgroups), args.max_ipgroups), file=sys.stderr)
ipgroups = ipgroups[:args.max_ipgroups]
if log_level >= 8:
print('{0:<50}{1:<38}{2:<5}{3:<80}'.format('IP group name', 'CHKP ID', 'Count', 'IP addresses'), file=sys.stderr)
for ipgroup in ipgroups:
ipgroup_members = str(ipgroup['members']) if len(str(ipgroup['members'])) <= 80 else str(ipgroup['members'])[:80]
print('{0:<50}{1:<38}{2:<5}{3:<50}'.format(ipgroup['name'], ipgroup['id'], str(ipgroup['member_count']), ipgroup_members), file=sys.stderr)
# Check whether any IP group is repeated
if len(list(set([x['id'] for x in ipgroups]))) != len(ipgroups):
if log_level >= 4:
print('ERROR: IP groups with repeated IDs found', file=sys.stderr)
if len(list(set([x['name'] for x in ipgroups]))) != len(ipgroups):
if log_level >= 4:
print('ERROR: IP groups with repeated names found', file=sys.stderr)
# Process rules
process_access_layers(access_layers, ipgroups)
if log_level >= 6:
print('INFO: {0} network rules found, spread across {1} rule collections ({2} allow rules, {3} deny rules)'.format(sum([len(x['rules']) for x in az_net_rcs]), len(az_net_rcs), sum([len(x['rules']) for x in az_net_rcs if x['action'] == 'Accept']), sum([len(x['rules']) for x in az_net_rcs if x['action'] == 'Drop'])), file=sys.stderr)
# Now we should have all rules stored as network rule collections. Check whether any can be transformed in an application rule
# App rules need to go into their own rule collections
def create_app_rules(net_rcs):
last_action = None
app_rcs = []
# Loop through a copy of the rules (you cannot change a list while looping through it)
net_rcs_copy = net_rcs.copy()
for net_rc in net_rcs_copy:
for net_rule in net_rc['rules']:
# Check whether the rule is for ports 80/443, and whether the target is a FQDN
if set(net_rule['destinationPorts']) in ({'80', '443'}, {'80'}, {'443'}) and len(net_rule['destinationFqdns']) > 0:
if log_level >= 7:
print('DEBUG: Transforming rule', net_rule['name'], 'to an application rule', file=sys.stderr)
if net_rc['action'] != last_action:
rule_collection = {
'name': rc_app_name + '-' + net_rc['action'] + '-' + str(len(az_app_rcs)),
'action': net_rc['action'],
'rules': []
}
# Append the rule collection to the list of rule collections and set last_action to the new value
app_rcs.append(rule_collection)
last_action = net_rc['action']
# Remove the rule from net_rules
net_rc['rules'].remove(net_rule)
# Change the rule type
net_rule['ruleType'] = 'applicationRule'
# Change the ipProtocols/destinationPorts
net_rule.pop('ipProtocols')
net_rule['protocols'] = []
if '80' in net_rule['destinationPorts']:
net_rule['protocols'].append({'protocolType': 'Http', 'port': 80})
if '443' in net_rule['destinationPorts']:
net_rule['protocols'].append({'protocolType': 'Https', 'port': 443})
net_rule['terminateTls'] = False
net_rule.pop('destinationPorts')
# Set some app rule attributes
net_rule['targetFqdns'] = net_rule['destinationFqdns']
net_rule.pop('destinationFqdns')
net_rule['targetUrls'] = []
net_rule['webCategories'] = []
net_rule['fqdnTags'] = []
# Add the rule to the last app rule collection
app_rcs[-1]['rules'].append(net_rule)
# Finished
return net_rcs, app_rcs
# Inspect both allow and deny network rules for candidates to transform into application rules
if args.use_apprules:
if log_level >= 7:
print('DEBUG: Checking whether any network rule can be transformed to an application rule', file=sys.stderr)
# az_net_rules_allow, az_app_rules_allow = create_app_rules(az_net_rules_allow, az_app_rules_allow)
# az_net_rules_deny, az_app_rules_deny = create_app_rules(az_net_rules_deny, az_app_rules_deny)
az_net_rcs, az_app_rcs = create_app_rules(az_net_rcs)
##########
# Output #
##########
# Generate JSON would be creating an object and serialize it
if args.output == "json":
api_version = "2021-08-01"
azfw_policy_name = args.policy_name
arm_template = {
'$schema': 'https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#',
'contentVersion': '1.0.0.0',
'parameters': {},
'variables': {
'location': '[resourceGroup().location]'
},
'resources': []
}
if not args.dont_create_policy:
resource_policy = {
'type': 'Microsoft.Network/firewallPolicies',
'apiVersion': api_version,
'name': azfw_policy_name,
'location': '[variables(\'location\')]',
'properties': {
'sku': {
'tier': args.policy_sku
},
'dnsSettings': {
'enableProxy': 'true'
},
'threatIntelMode': 'Alert'
}
}
arm_template['resources'].append(resource_policy)
resource_rcg = {
'type': 'Microsoft.Network/firewallPolicies/ruleCollectionGroups',
'apiVersion': api_version,
'name': azfw_policy_name + '/' + rcg_name,
'dependsOn': [],
'location': '[variables(\'location\')]',
'properties': {
'priority': rcg_prio,
'ruleCollections': []
}
}
if not args.dont_create_policy:
resource_rcg['dependsOn'].append('[resourceId(\'Microsoft.Network/firewallPolicies\', \'' + azfw_policy_name +'\')]'),
if args.use_ipgroups:
for ip_grp in ipgroups:
resource_ipgroup = {
'type': 'Microsoft.Network/ipGroups',
'apiVersion': api_version,
'name': format_to_arm_name(ip_grp['name']),
'location': '[variables(\'location\')]',
'properties': {
'ipAddresses': ip_grp['members']
}
}
arm_template['resources'].append(resource_ipgroup)
resource_rcg['dependsOn'].append("[resourceId('Microsoft.Network/ipGroups', '{0}')]".format(format_to_arm_name(ip_grp['name'])))
# Add network rule collections to the template
rc_net_prio = int(rc_net_prio_start)
for net_rc in az_net_rcs:
resource_rcg['properties']['ruleCollections'].append({
'ruleCollectionType': 'FirewallPolicyFilterRuleCollection',
'name': net_rc['name'],
'priority': str(rc_net_prio),
'action': {
'type': 'deny' if net_rc['action'] == 'Drop' else 'allow'
},
'rules': net_rc['rules']
})
rc_net_prio += 10
# Add application rule collections to the template
rc_app_prio = int(rc_app_prio_start)
for app_rc in az_app_rcs:
resource_rcg['properties']['ruleCollections'].append({
'ruleCollectionType': 'FirewallPolicyFilterRuleCollection',
'name': app_rc['name'],
'priority': str(rc_app_prio),
'action': {
'type': 'deny' if app_rc['action'] == 'Drop' else 'allow'
},
'rules': app_rc['rules']
})
rc_app_prio += 10
# if len(az_net_rules_allow) > 0:
# resource_rcg['properties']['ruleCollections'].append(resource_net_rc_allow)
# if len(az_net_rules_deny) > 0:
# resource_rcg['properties']['ruleCollections'].append(resource_net_rc_deny)
# if len(az_app_rules_allow) > 0:
# resource_rcg['properties']['ruleCollections'].append(resource_app_rc_allow)
# if len(az_app_rules_deny) > 0:
# resource_rcg['properties']['ruleCollections'].append(resource_app_rc_deny)
arm_template['resources'].append(resource_rcg)
if args.pretty:
print(json.dumps(arm_template, indent=4, sort_keys=True))
else:
print(json.dumps(arm_template))
elif args.output == "none":
if log_level >= 6:
print('INFO: No output type selected', file=sys.stderr)
else:
if log_level >= 3:
print ("ERROR: Output type", args.output, "not recognized!", file=sys.stderr)
# Last info message
if log_level >= 6:
print('INFO: Summary:', file=sys.stderr)
print('INFO: {0} Checkpoint rules analized'.format(str(cnt_chkp_rules)), file=sys.stderr)
print('INFO: {0} Azure Firewall network rules, spread across {1} rule collections ({2} allow rules, {3} deny rules)'.format(sum([len(x['rules']) for x in az_net_rcs]), len(az_net_rcs), sum([len(x['rules']) for x in az_net_rcs if x['action'] == 'Accept']), sum([len(x['rules']) for x in az_net_rcs if x['action'] == 'Drop'])), file=sys.stderr)
print('INFO: {0} Azure Firewall application rules, spread across {1} rule collections ({2} allow rules, {3} deny rules)'.format(sum([len(x['rules']) for x in az_app_rcs]), len(az_app_rcs), sum([len(x['rules']) for x in az_app_rcs if x['action'] == 'Accept']), sum([len(x['rules']) for x in az_app_rcs if x['action'] == 'Drop'])), file=sys.stderr)
print('INFO: {0} Checkpoint discarded rules:'.format(len(discarded_rules)), file=sys.stderr)
print_access_layer_rule(access_layers, discarded_rules, debug=True)